package discover

import (
	"context"
	"encoding/json"
	"fmt"
	"gitee.com/zhancaihua/goyt/core/logyt"
	"github.com/pkg/errors"
	clientv3 "go.etcd.io/etcd/client/v3"
	"strings"
)

type Endpoint struct {
	Addr string `json:"addr,omitempty"`
}
type UpdateItem struct {
	Endpoint Endpoint `json:"endpoint"`
	Key      string   `json:"key,omitempty"`
}
type UpdateOpType uint8

const (
	Put UpdateOpType = iota
	Del
)

type WatchChannel <-chan []*UpdateOp
type UpdateOp struct {
	Endpoint Endpoint
	Key      string
	Op       UpdateOpType
}

// NewManager creates an endpoint manager which implements the interface of 'Manager'.
func NewManager(client EtcdClient, target string) (*EndpointUpdateManager, error) {
	if client == nil {
		return nil, errors.New("invalid etcd client")
	}

	if target == "" {
		return nil, errors.New("invalid target")
	}

	em := &EndpointUpdateManager{
		cli:    client,
		target: target,
	}
	return em, nil
}

type EndpointUpdateManager struct {
	cli    EtcdClient
	target string
}

func Parse(target string, key string, end Endpoint) (string, error) {
	if !strings.HasPrefix(key, makeKeyPrefix(target)) {
		return "", errors.Errorf("endpoints: endpoint key should be prefixed with '%s/' got: '%s'", target, key)
	}
	up := &UpdateItem{
		Endpoint: end,
		Key:      key,
	}
	var v []byte
	var err error
	if v, err = json.Marshal(up); err != nil {
		return "", fmt.Errorf("marshal Endpoint with err %w", err)
	}
	return string(v), nil
}

// NewWatchChannel ctx cancel后协程退出，watch资源释放
func (m *EndpointUpdateManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
	res, err := m.cli.Get(ctx, makeKeyPrefix(m.target), clientv3.WithPrefix())
	if nil != err {
		return nil, fmt.Errorf("NewWatchChannel with err %w", err)
	}
	initUpdateOp := make([]*UpdateOp, 0, len(res.Kvs))
	for _, kv := range res.Kvs {
		item := &UpdateItem{}
		if err := json.Unmarshal(kv.Value, item); err != nil {
			logyt.WarnLog("unmarshal endpoint update failed", logyt.LogField{
				Key:   string(kv.Key),
				Value: err,
			})
			continue
		}
		up := &UpdateOp{
			Endpoint: item.Endpoint,
			Key:      item.Key,
			Op:       Put,
		}
		initUpdateOp = append(initUpdateOp, up)
	}
	upch := make(chan []*UpdateOp, 1)
	if len(initUpdateOp) > 0 {
		upch <- initUpdateOp
	}
	go m.watch(ctx, res.Header.Revision+1, upch)
	return upch, nil
}

func (m *EndpointUpdateManager) watch(ctx context.Context, i int64, upch chan []*UpdateOp) {
	defer close(upch)
	wch := m.cli.Watch(ctx, makeKeyPrefix(m.target), clientv3.WithRev(i), clientv3.WithPrefix())
	for {
		select {
		case <-ctx.Done():
		case wres, ok := <-wch:
			if !ok {
				logyt.WarnLog("watch closed", logyt.LogField{Key: "target", Value: m.target})
				return
			}
			if wres.Err() != nil {
				logyt.WarnLog("watch failed", logyt.LogField{Key: "target", Value: m.target}, logyt.LogField{
					Key:   "error",
					Value: wres.Err(),
				})
				return
			}

			deltaUps := make([]*UpdateOp, 0, len(wres.Events))
			for _, e := range wres.Events {
				var iup UpdateOp
				var err error
				switch e.Type {
				case clientv3.EventTypePut:
					item := &UpdateItem{}
					err = json.Unmarshal(e.Kv.Value, &item)
					iup.Op = Put
					iup.Endpoint = item.Endpoint
					iup.Key = string(e.Kv.Key)
					if err != nil {
						logyt.WarnLog("unmarshal endpoint update failed", logyt.LogField{Key: "key", Value: e.Kv.Key}, logyt.LogField{
							Key:   "error",
							Value: wres.Err(),
						})
						continue
					}
				case clientv3.EventTypeDelete:
					iup.Op = Del
					iup.Key = string(e.Kv.Key)
				default:
					continue
				}
				deltaUps = append(deltaUps, &iup)
			}
			if len(deltaUps) > 0 {
				upch <- deltaUps
			}
		}
	}
}
func makeKeyPrefix(key string) string {
	return fmt.Sprintf("%s%c", key, Delimiter)
}
